package com.qf.kafka;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * @author Thor
 * @公众号 Java架构栈
 */
public class MyProducerDemo1 {

    private static final String TOPIC_NAME = "my-cluster-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.设置参数
        Properties props = new Properties();
        //1.1 设置集群ip
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.137:9092,192.168.10.137:9093,192.168.10.137:9094");
        //1.2 设置序列化参数
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //配置ack
        props.put(ProducerConfig.ACKS_CONFIG, "1");

        //设置发送消息的本地缓冲区，如果设置了该缓冲区，消息会先发送到本地缓冲区，可以提高消息发送性能，默认值是33554432，即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        /*
        kafka本地线程会从缓冲区取数据，批量发送到broker，
        设置批量发送消息的大小，默认值是16384，即16kb，就是说一个batch满了16kb就发送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        /*
        默认值是0，意思就是消息必须立即被发送，但这样会影响性能
        一般设置10毫秒左右，就是说这个消息发送完后会进入本地的一个batch，如果10毫秒内，这个batch满了16kb就会随batch一起被发送出去
        如果10毫秒内，batch没满，那么也必须把消息发送出去，不能让消息的发送延迟时间太长
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);


        //2.创建一个发送消息的生产者客户端，传入之前设置好的参数对象
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        CountDownLatch countDownLatch = new CountDownLatch(5);


        for (int i = 5; i < 10; i++) {
            //3.创建一个消息对象
//            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "my-record-key"+i, "hellojavaa");
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "my-record-key-" + i, "helloworld-" + i);
            //4.发送消息
            //同步发送,并获得返回的结果，并答应
//            RecordMetadata metadata = producer.send(producerRecord).get();
//            System.out.println("同步方式发送消息结果：" + "topic-" + metadata.topic() + "|partition-"
//                    + metadata.partition() + "|offset-" + metadata.offset());
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("消息发送失败了：" + exception.getMessage());
                        return;
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果：" + "topic->" + metadata.topic() + " | partition->"
                                + metadata.partition() + " | offset->" + metadata.offset());
                    }
                    countDownLatch.countDown();
                }
            });
            System.out.println("aaaaaaaaa");
        }

        countDownLatch.await();

    }


}
